/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it
 * when there is no task pending in the task queue for 1 second.  Please note it is not scalable to
 * schedule large number of tasks to this executor; use a dedicated executor.
 */
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {

  private static final InternalLogger logger = InternalLoggerFactory
      .getInstance(GlobalEventExecutor.class);

  private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);

  public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();

  final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
  final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
      this, Executors.<Void>callable(new Runnable() {
    @Override
    public void run() {
      // NOOP
    }
  }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL),
      -SCHEDULE_QUIET_PERIOD_INTERVAL);

  // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
  // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
  // be sticky about its thread group
  // visible for testing
  final ThreadFactory threadFactory =
      new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false,
          Thread.NORM_PRIORITY, null);
  private final TaskRunner taskRunner = new TaskRunner();
  private final AtomicBoolean started = new AtomicBoolean();
  volatile Thread thread;

  private final Future<?> terminationFuture = new FailedFuture<Object>(this,
      new UnsupportedOperationException());

  private GlobalEventExecutor() {
    scheduledTaskQueue().add(quietPeriodTask);
  }

  /**
   * Take the next {@link Runnable} from the task queue and so will block if no task is currently
   * present.
   *
   * @return {@code null} if the executor thread has been interrupted or waken up.
   */
  Runnable takeTask() {
    BlockingQueue<Runnable> taskQueue = this.taskQueue;
    for (; ; ) {
      ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
      if (scheduledTask == null) {
        Runnable task = null;
        try {
          task = taskQueue.take();
        } catch (InterruptedException e) {
          // Ignore
        }
        return task;
      } else {
        long delayNanos = scheduledTask.delayNanos();
        Runnable task;
        if (delayNanos > 0) {
          try {
            task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
          } catch (InterruptedException e) {
            // Waken up.
            return null;
          }
        } else {
          task = taskQueue.poll();
        }

        if (task == null) {
          fetchFromScheduledTaskQueue();
          task = taskQueue.poll();
        }

        if (task != null) {
          return task;
        }
      }
    }
  }

  private void fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
      taskQueue.add(scheduledTask);
      scheduledTask = pollScheduledTask(nanoTime);
    }
  }

  /**
   * Return the number of tasks that are pending for processing.
   *
   * <strong>Be aware that this operation may be expensive as it depends on the internal
   * implementation of the
   * SingleThreadEventExecutor. So use it was care!</strong>
   */
  public int pendingTasks() {
    return taskQueue.size();
  }

  /**
   * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance
   * was shutdown before.
   */
  private void addTask(Runnable task) {
    if (task == null) {
      throw new NullPointerException("task");
    }
    taskQueue.add(task);
  }

  @Override
  public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
  }

  @Override
  public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    return terminationFuture();
  }

  @Override
  public Future<?> terminationFuture() {
    return terminationFuture;
  }

  @Override
  @Deprecated
  public void shutdown() {
    throw new UnsupportedOperationException();
  }

  @Override
  public boolean isShuttingDown() {
    return false;
  }

  @Override
  public boolean isShutdown() {
    return false;
  }

  @Override
  public boolean isTerminated() {
    return false;
  }

  @Override
  public boolean awaitTermination(long timeout, TimeUnit unit) {
    return false;
  }

  /**
   * Waits until the worker thread of this executor has no tasks left in its task queue and
   * terminates itself. Because a new worker thread will be started again when a new task is
   * submitted, this operation is only useful when you want to ensure that the worker thread is
   * terminated <strong>after</strong> your application is shut down and there's no chance of
   * submitting a new task afterwards.
   *
   * @return {@code true} if and only if the worker thread has been terminated
   */
  public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
    if (unit == null) {
      throw new NullPointerException("unit");
    }

    final Thread thread = this.thread;
    if (thread == null) {
      throw new IllegalStateException("thread was not started");
    }
    thread.join(unit.toMillis(timeout));
    return !thread.isAlive();
  }

  @Override
  public void execute(Runnable task) {
    if (task == null) {
      throw new NullPointerException("task");
    }

    addTask(task);
    if (!inEventLoop()) {
      startThread();
    }
  }

  private void startThread() {
    if (started.compareAndSet(false, true)) {
      final Thread t = threadFactory.newThread(taskRunner);
      // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
      // classloader.
      // See:
      // - https://github.com/netty/netty/issues/7290
      // - https://bugs.openjdk.java.net/browse/JDK-7008595
      AccessController.doPrivileged(new PrivilegedAction<Void>() {
        @Override
        public Void run() {
          t.setContextClassLoader(null);
          return null;
        }
      });

      // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
      // an assert error.
      // See https://github.com/netty/netty/issues/4357
      thread = t;
      t.start();
    }
  }

  final class TaskRunner implements Runnable {

    @Override
    public void run() {
      for (; ; ) {
        Runnable task = takeTask();
        if (task != null) {
          try {
            task.run();
          } catch (Throwable t) {
            logger.warn("Unexpected exception from the global event executor: ", t);
          }

          if (task != quietPeriodTask) {
            continue;
          }
        }

        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
        // Terminate if there is no task in the queue (except the noop task).
        if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
          // Mark the current thread as stopped.
          // The following CAS must always success and must be uncontended,
          // because only one thread should be running at the same time.
          boolean stopped = started.compareAndSet(true, false);
          assert stopped;

          // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
          if (taskQueue.isEmpty() && (scheduledTaskQueue == null
              || scheduledTaskQueue.size() == 1)) {
            // A) No new task was added and thus there's nothing to handle
            //    -> safe to terminate because there's nothing left to do
            // B) A new thread started and handled all the new tasks.
            //    -> safe to terminate the new thread will take care the rest
            break;
          }

          // There are pending tasks added again.
          if (!started.compareAndSet(false, true)) {
            // startThread() started a new thread and set 'started' to true.
            // -> terminate this thread so that the new thread reads from taskQueue exclusively.
            break;
          }

          // New tasks were added, but this worker was faster to set 'started' to true.
          // i.e. a new worker thread was not started by startThread().
          // -> keep this thread alive to handle the newly added entries.
        }
      }
    }
  }
}
